Source code for hysop.core.graph.computational_node

# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


"""
@file graph_operator.py
Base for directionally splitted advection solvers (pure-python and GPU version).
"""

import copy
import warnings
from abc import ABCMeta, abstractmethod

from hysop import dprint
from hysop.tools.htypes import InstanceOf, to_set, check_instance, first_not_None
from hysop.tools.io_utils import IOParams
from hysop.tools.parameters import MPIParams
from hysop.parameters.parameter import Parameter
from hysop.fields.continuous_field import Field, ScalarField, TensorField
from hysop.core.graph.node_requirements import NodeRequirements
from hysop.core.graph.graph import (
    wraps,
    not_initialized,
    initialized,
    discretized,
    ready,
)
from hysop.core.graph.continuous import OperatorBase
from hysop.topology.topology import Topology, TopologyView
from hysop.tools.decorators import debug
from hysop.tools.warning import HysopWarning
from hysop.topology.cartesian_descriptor import get_topo_descriptor_discretization


[docs] def base_initialized(f): assert callable(f) @wraps(f) def _check(*args, **kwds): self = args[0] msg = "Cannot call {}.{}() on node '{}' because {}".format( self.__class__.__name__, f.__name__, self.name, "{}" ) if not self._base_initialized: reason = "this self._init_base() has not been called yet." raise RuntimeError(msg.format(reason)) return f(*args, **kwds) return _check
[docs] def topology_handled(f): assert callable(f) @wraps(f) def _check(*args, **kwds): self = args[0] msg = "Cannot call {}.{}() on node '{}' because {}".format( self.__class__.__name__, f.__name__, self.name, "{}" ) if not self.topology_handled: reason = "this self.handle_topologies() has not been called yet." raise RuntimeError(msg.format(reason)) return f(*args, **kwds) return _check
[docs] def to_be_skipped_default(*args, **kwargs): """Default function for skipping operator's apply""" return False
[docs] class ComputationalGraphNode(OperatorBase, metaclass=ABCMeta): """ Interface of an abstract computational graph node. """ @debug def __new__( cls, input_fields=None, output_fields=None, input_params=None, output_params=None, input_tensor_fields=None, output_tensor_fields=None, name=None, pretty_name=None, method=None, to_be_skipped_func=None, **kwds, ): return super().__new__( cls, name=None, fields=None, tensor_fields=None, parameters=None, **kwds ) @debug def __init__( self, input_fields=None, output_fields=None, input_params=None, output_params=None, input_tensor_fields=None, output_tensor_fields=None, name=None, pretty_name=None, method=None, to_be_skipped_func=None, **kwds, ): """ Initialize a ComputationalGraphNode. Parameters ---------- input_fields: dict, optional input fields as a dictionnary (see Notes). output_fields: dict, optional output fields as a dictionnary (see Notes). input_params: array like of hysop.parameters.Parameter or dict, optional (see Notes) input parameters as a set or a dictionnary. output_params: array like of hysop.parameters.Parameter or dict, optional (see Notes) output parameters as a set or a dictionnary. input_tensor_fields: tuple, optional input tensor fields as a tuple. If given, input_fields is assumed to contain only ScalarFields. Else they are deduced from input_fields. output_tensor_fields: tuple, optional output tensor fields as a tuple. If given, output_fields is assumed to contain only ScalarFields. Else they are deduced from output_fields. name: str, optional name of this node (string), optional, defaults to top class name. Default name will be the top class name (ie. self.__class__.__name__). pretty_name: str, optional Pretty name of this node (string), optional, defaults to name. method: dict, optional user method specification for this graph node, optional, defaults to None. kwds: arguments for base classes (mpi_params and io_params). Attributes ---------- name: str name of this node (used for printing and display purpose). pretty_name: str Pretty name of this node (used for printing and display purpose). input_fields: dict input fields as a dictionnary (see Notes). output_fields: dict output fields as a dictionnary (see Notes). base_method: dict base method specification for this graph node. initialized: bool flag set after initialize() has been called. discretized: bool flag set after discretize() has been called. ready: bool flag set after setup() has been called. method : dict(MethodKey, MethodValue) method, set after initialize() has been called. input_field_requirements : dict(Field, DiscreteFieldRequirements) input constraints, set after initialize() has been called. output_field_requirements = {} output constraints, set after initialize() has been called. Notes ----- For the input and output fields, the keys of the dicts have to be of type :class:`hysop.fields.continuous_field.Field`. and the values should consist of :class:`hysop.topology.topology_descriptor.TopologyDescriptors` instances ie. an already defined topology or a topology descriptor. VectorFields and TensorFields are expanded to ScalarFields. For input and output parameters, in case of dict, the key is the actual Parameter and the value must be an MPIParams, or None Giving the following keywords as inputs (in **kwds) will throw a ValueError: input_vars, output_vars, variables, iwork, rwork, work, backend About the method parameter: One can not directly use the method parameter after this call. User method is put into attribute base_method awaiting the initialization step. See ComputationalGraphNode.handle_method() to see how method is handled. """ should_init = ( (input_fields is not None) or (output_fields is not None) or (input_params is not None) or (output_params is not None) ) # Check extra args cls = self.__class__ for _ in ("variables", "input_vars", "output_vars"): if _ in kwds.keys(): msg = ( "The '{}' parameter should not be used in {}, use input_fields and " ) msg += "output_fields instead." msg = msg.format(_, cls) raise ValueError(msg) if ("iwork" in kwds) or ("rwork" in kwds) or ("work" in kwds): msg = "work, rwork or iwork parameters can not be used before the full description \ of the graph in class {}.".format( cls ) raise ValueError(msg) if "backend" in kwds: msg = "{} is not a ComputationalGraphNodeFrontend thus no backend can be specified." msg = msg.format(cls) raise ValueError(msg) # Expand input and output TensorFields to ScalarFields # (a VectorField is just a TensorField so VectorFields are handled as well) if input_tensor_fields is not None: check_instance(input_tensor_fields, tuple, values=TensorField) check_instance(input_fields, dict, keys=ScalarField) for tfield in input_tensor_fields: for field in tfield: if field not in input_fields: msg = "Input fields and input tensor fields mismatch." raise RuntimeError(msg) elif input_fields is not None: input_tensor_fields = tuple( filter(lambda x: x.is_tensor, input_fields.keys()) ) input_fields = { sfield: topod for (tfield, topod) in input_fields.items() for sfield in tfield.fields } else: input_tensor_fields = () if output_tensor_fields is not None: check_instance(output_tensor_fields, tuple, values=TensorField) check_instance(output_fields, dict, keys=ScalarField) for tfield in output_tensor_fields: for field in tfield: if field not in output_fields: msg = "Output fields and output tensor fields mismatch." raise RuntimeError(msg) elif output_fields is not None: output_tensor_fields = tuple( filter(lambda x: x.is_tensor, output_fields.keys()) ) output_fields = { sfield: topod for (tfield, topod) in output_fields.items() for sfield in tfield.fields } else: output_tensor_fields = () # Check input values input_fields = first_not_None(input_fields, {}) output_fields = first_not_None(output_fields, {}) input_params = first_not_None(input_params, {}) output_params = first_not_None(output_params, {}) method = first_not_None(method, {}) name = first_not_None(name, self.__class__.__name__) pretty_name = first_not_None(pretty_name, name) if not isinstance(name, str): msg = "name is not a string but a {}." raise ValueError(msg.format(name.__class__)) if not isinstance(pretty_name, str): msg = "pretty_name is not a string but a {}." raise ValueError(msg.format(name.__class__)) if not isinstance(input_fields, dict): msg = "input_fields is not a dict but a {}." raise ValueError(msg.format(input_fields.__class__)) if not isinstance(output_fields, dict): msg = "output_fields is not a dict but a {}." raise ValueError(msg.format(output_fields.__class__)) if not isinstance(input_params, dict): input_params = to_set(input_params) input_params = {p: None for p in input_params} if not isinstance(output_params, dict): output_params = to_set(output_params) output_params = {p: None for p in output_params} self.name = name self.pretty_name = pretty_name self.input_fields = input_fields self.output_fields = output_fields self.input_params = input_params self.output_params = output_params self.input_tensor_fields = input_tensor_fields self.output_tensor_fields = output_tensor_fields self.base_method = method self.initialized = False self.topology_handled = False self.discretized = False self.ready = False self.input_discrete_fields = None self.output_discrete_fields = None self.discrete_fields = None self.input_discrete_tensor_fields = None self.output_discrete_tensor_fields = None self.discrete_tensor_fields = None if not hasattr(self, "_field_requirements"): self._field_requirements = None # graph builder hints to build I/O operators. self._input_fields_to_dump = [] self._output_fields_to_dump = [] self._input_params_to_dump = [] self._output_params_to_dump = [] self._base_initialized = False self.__kwds = kwds if should_init: self._init_base( input_fields, output_fields, input_tensor_fields, output_tensor_fields, input_params, output_params, ) else: # if we are a graph we still don't know input and output variables # => defer initialization of base class until full initialization. from hysop.core.graph.computational_graph import ComputationalGraph check_instance(self, ComputationalGraph) io_params = kwds.get("io_params", False) self.io_params = io_params self.mpi_params = kwds.get("mpi_params", None) self._set_io() if to_be_skipped_func is None: self.to_be_skipped = to_be_skipped_default else: self.to_be_skipped = to_be_skipped_func def _get_is_domainless(self): """Return True if this node has no input nor output fields.""" return (not self.input_fields) and (not self.output_fields) is_domainless = property(_get_is_domainless)
[docs] @classmethod def expand_tensor_fields(cls, fields): scalar_fields = () tensor_fields = () for field in fields: if field is None: scalar_fields += (None,) elif field.is_tensor: scalar_fields += field.fields tensor_fields += (field,) else: scalar_fields += (field,) return (scalar_fields, tensor_fields)
@debug def _setup_method(self, topgraph_method): """ Get method values from top graph method in computational graph, and combines it to user method to build final method dictionnary. """ cls = type(self) if topgraph_method: base_method = self.base_method avail_methods = self.available_methods() extra_keys = ( set(topgraph_method.keys()) .intersection(avail_methods.keys()) .difference(self.base_method.keys()) ) method = self.base_method.copy() for k in extra_keys: method[k] = topgraph_method[k] else: method = self.base_method method = self._check_method(method) return method @debug def _init_base( self, input_fields, output_fields, input_tensor_fields, output_tensor_fields, input_params, output_params, is_root=False, ): """ Initialize base class and check everything. """ # Merge scalar and tensor fields all_input_fields = tuple(input_tensor_fields) for ofield in input_fields.keys(): if not any(ofield in tf for tf in input_tensor_fields): all_input_fields += (ofield,) all_output_fields = tuple(output_tensor_fields) for ofield in output_fields.keys(): if not any(ofield in tf for tf in output_tensor_fields): all_output_fields += (ofield,) assert not self._base_initialized check_instance(input_fields, dict, keys=ScalarField) check_instance(output_fields, dict, keys=ScalarField) check_instance( input_params, dict, keys=Parameter, values=(MPIParams, type(None)) ) check_instance( output_params, dict, keys=Parameter, values=(MPIParams, type(None)) ) check_instance(input_tensor_fields, tuple, values=TensorField) check_instance(output_tensor_fields, tuple, values=TensorField) check_instance(all_input_fields, tuple, values=Field) check_instance(all_output_fields, tuple, values=Field) self.input_fields = input_fields self.output_fields = output_fields self.input_params = input_params self.output_params = output_params self.input_tensor_fields = input_tensor_fields self.output_tensor_fields = output_tensor_fields ifields = set(self.input_fields.keys()) ofields = set(self.output_fields.keys()) fields = tuple(ifields.union(ofields)) itfields = set(self.input_tensor_fields) otfields = set(self.output_tensor_fields) tfields = tuple(itfields.union(otfields)) iparams = set(self.input_params.keys()) oparams = set(self.output_params.keys()) parameters = tuple(iparams.union(oparams)) if ("mpi_params" in self.__kwds) and ( "ComputationalGraph" not in map(lambda c: c.__name__, self.__class__.__mro__) ): mpi_params = self.__kwds["mpi_params"] for topo in set(self.input_fields.values()).union( self.output_fields.values() ): if isinstance(topo, Topology) and (topo.mpi_params != mpi_params): d = topo.mpi_params.diff(mpi_params) msg = "MPI parameters mismatch between already specified topology mpi_params " msg += f"and operator MPI paramaters in operator {self.name}." msg += f"\n *operator: {mpi_params}" msg += f"\n *field: {topo.mpi_params}" msg += f"\n >diff : {d}\n" if not ( "task_id" in d.keys() and "comm" in d.keys() and not "on_task" in d.keys() ): raise RuntimeError(msg) super().__init__( name=self.name, fields=fields, tensor_fields=tfields, parameters=parameters, **self.__kwds, ) # Consolidate unkwnown mpi_params for parameters. for p in iparams: if self.input_params[p] is None: self.input_params[p] = self.mpi_params for p in oparams: if self.output_params[p] is None: self.output_params[p] = self.mpi_params # after consolidation : None value not allowed anymore check_instance(self.input_params, dict, keys=Parameter, values=MPIParams) check_instance(self.output_params, dict, keys=Parameter, values=MPIParams) self._base_initialized = True self.all_input_fields = all_input_fields self.all_output_fields = all_output_fields @debug def _check_method(self, user_method): """ Update user method with default method and check againt available methods. """ method = self.default_method().copy() if user_method is not None: method.update(user_method) available_methods = self.available_methods() for k, v in method.items(): if k not in available_methods.keys(): msg = "{} is not an available method key for computational node {}." msg = msg.format(k, self.name) warnings.warn(msg, HysopWarning) continue available = to_set(available_methods[k]) instances = {x for x in available if isinstance(x, InstanceOf)} available = available.difference(instances) good = False for instance in instances: if instance.match_instance(v): good = True break good = good or (v in available) if not good: msg = f"{v} is not an available method value for key {k.__name__}," msg += f"\n possible values are {available_methods[k]}." raise ValueError(msg) return method
[docs] @debug @base_initialized def check(self): """ Check if node was correctly initialized. By default this checks variables, topologies and support. """ self._check_variables() self._check_topologies() self._check_support()
@debug @base_initialized def _check_variables(self): """ Check input and output variables. Called automatically in ComputationalGraphNode.check() """ for variables in [self.input_fields, self.output_fields]: for k, v in variables.items(): if not isinstance(k, Field): msg = "Given key is not a continuous Field (got a {})." raise TypeError(msg.format(k.__class__)) if not isinstance(v, TopologyView): msg = f"Expected a Topology instance but got a {v.__class__}." msg += "\nAll topologies are expected to be set after " msg += ( "ComputationalGraph.get_field_requirements() has been called." ) raise TypeError(msg) @debug @base_initialized def _check_topologies(self): """ Sets topologies flags. _is_distributed _has_multiple_topologies _has_multiple_field_topologies Sets the following attributes: _multi_topo_fields (list of field that have at least two different topologies) Called automatically in ComputationalGraphNode.check() """ is_distributed = self.mpi_params.size > 1 has_multiple_topologies = False has_multiple_field_topologies = False multi_topo_fields = set() topos = tuple(self.input_fields.values()) + tuple(self.output_fields.values()) if topos: topo_ref = first_not_None(topos).topology for variables in [self.input_fields, self.output_fields]: for field, topo in variables.items(): if topo is not None and (topo.topology != topo_ref): has_multiple_topologies = True from hysop.core.mpi.redistribute import RedistributeInter if isinstance(self, RedistributeInter): for field in set(self.input_fields.keys()).union( set(self.output_fields.keys()) ): multi_topo_fields.add(field) has_multiple_field_topologies = True else: for ifield in self.input_fields: if ( ifield in self.output_fields and self.input_fields[ifield].topology != self.output_fields[ifield].topology ): multi_topo_fields.add(ifield) has_multiple_field_topologies = True self._is_distributed = is_distributed self._has_multiple_topologies = has_multiple_topologies self._has_multiple_field_topologies = has_multiple_field_topologies self._multi_topo_fields = multi_topo_fields @debug @base_initialized def _check_support(self): """ Check input and output variables topologies against the supported topologies of this node. See ComputationalGraphNode.supports_multiple_topologies() ComputationalGraphNode.supports_multiple_field_topologies() ComputationalGraphNode.supports_mpi() Called automatically in ComputationalGraphNode.check() """ cls = self.__class__ if (self._has_multiple_field_topologies) and ( not cls.supports_multiple_field_topologies() ): msg = "Graph operator '{}' does not support multiple topologies yet." msg += "\nTopology mismatch for continuous variable(s) {} between " msg += "input and output variables." msg = msg.format(self.name, [f.name for f in self._multi_topo_fields]) raise NotImplementedError(msg) if (self._has_multiple_topologies) and (not cls.supports_multiple_topologies()): msg = "Graph operator {} does not support multiple field topologies yet." msg = msg.format(self.node_tag) msg += "\n>Input topologies:" for field, topo in self.input_fields.items(): msg += f"\n *{field.short_description()} -> {topo.short_description()}" msg += "\n>Output topologies:" for field, topo in self.output_fields.items(): msg += f"\n *{field.short_description()} -> {topo.short_description()}" raise NotImplementedError(msg) if (self._is_distributed) and (not cls.supports_mpi()): msg = "\nMPI multi-process has not been implemented in graph operator '{}' yet!\n" msg = msg.format(type(self)) raise NotImplementedError(msg) # ComputationalGraphNode interface
[docs] @base_initialized def get_topologies(self): """ Returns all the topologies used in this operator. Topologies are organized by backend in a dictionnary. """ topologies = {} for topo in set(self.input_fields.values()).union(self.output_fields.values()): if topo is not None: topologies.setdefault(topo.backend, set()).add(topo) return topologies
[docs] def get_domains(self): """ Returns all the domains used in this operator. Domains are keys and values are operators that have variables defined on this domain. If this node has no domain (ie. no input or output variables), if fills the 'None' domain. """ domains = {} for field in set(self.input_fields.keys()).union(self.output_fields.keys()): domains.setdefault(field.domain, set()).add(self) if self.is_domainless: domains.setdefault(None, set()).add(self) return domains
[docs] @base_initialized def get_backends(self): """ Returns all the backends used in this operator as a set. """ return self.get_topologies().keys()
[docs] @abstractmethod def available_methods(self): """ Returns the available methods of this node. This should return a dictionary of method as keys and possible values as a scalar or an iterable. See hysop.types.InstanceOf to support specific class types. This is used to check user method input. """ pass
[docs] @abstractmethod def default_method(self): """ Returns the default method of this node. Default methods should be compatible with available_methods. If the user provided method dictionnaty misses some method keys, a default value for this key will be extracted from the default one. """ pass
[docs] @debug def handle_method(self, method): """ Method automatically called during initialization. This allow to extract method values after method preprocessing. Method preprocessing means: 1) complete user input with compatible top graph user inputs 2) complete the resulting dictionnary with the node default_method 3) check method against available_methods. The result of this process is fed as argument of this function. """ self.method = {k: v for (k, v) in method.items()}
[docs] @abstractmethod @debug def get_field_requirements(self): """ Called just after handle_method(), ie self.method has been set. Topology requirements are: 1) min and max ghosts for each input and output variables 2) allowed splitting directions for cartesian topologies 3) required local and global transposition state, if any. and more They are stored in self.input_field_requirements and self.output_field_requirements. Keys are continuous fields and values are of type hysop.fields.field_requirement.DiscreteFieldRequirements """ from hysop.fields.field_requirements import OperatorFieldRequirements return OperatorFieldRequirements()
[docs] @debug def get_node_requirements(self): """Called after get_field_requirements to get global node requirements.""" return NodeRequirements(self)
[docs] @debug def get_and_set_field_requirements(self): """ Calls get_field_requirements() on current node and set self.field_requirements. """ field_requirements = self.get_field_requirements() assert field_requirements is not None self._field_requirements = field_requirements node_requirements = self.get_node_requirements() assert isinstance(node_requirements, NodeRequirements) self._node_requirements = field_requirements node_requirements.check_and_update_reqs(field_requirements) return field_requirements
[docs] def get_input_field_requirements(self): """ Returns input field requirements for this node. """ freqs = self._field_requirements if freqs is None: msg = "{}.get_and_set_field_requirements() has not been called yet " msg += "on node {}." msg = msg.format(type(self).__name__, self.name) raise RuntimeError(msg) return self._field_requirements.input_field_requirements
[docs] def get_output_field_requirements(self): """ Returns output field requirements for this node. """ freqs = self._field_requirements if freqs is None: msg = "{}.get_and_set_field_requirements() has not been called yet " msg += "on node {}." msg = msg.format(type(self).__name__, self.name) raise RuntimeError(msg) return freqs.output_field_requirements
input_field_requirements = property(get_input_field_requirements) output_field_requirements = property(get_output_field_requirements)
[docs] @debug def handle_topologies(self, input_topology_states, output_topology_states): """ Called after all topologies have been set up. Topologies are available as values of self.input_fields and self.output_fields and are mapped by continuous Field. In addition input_topology_states are passed as argument contains input discrete topology states and output topology states that the graph builder determined. All input states and output topology states have to comply with the operator field requirements obtained with self.get_field_requirements(). """ from hysop.topology.topology import TopologyState check_instance(input_topology_states, dict, keys=Field, values=TopologyState) check_instance(output_topology_states, dict, keys=Field, values=TopologyState) self.topology_handled = True
[docs] @classmethod def get_topo_descriptor(cls, variables, field): if field in variables: return variables[field] tfields = tuple(filter(lambda x: x.is_tensor, variables.keys())) for tfield in tfields: if field in tfield: return variables[tfield] msg = "Could not find any topology descriptor corresponding to field {}." msg = msg.format(field.short_description()) raise KeyError(msg)
[docs] @classmethod def get_topo_discretization(cls, variables, field): topo = cls.get_topo_descriptor(variables=variables, field=field) return get_topo_descriptor_discretization(topo)
[docs] @classmethod def supports_multiple_topologies(cls): """ Should return True if this node supports multiple topologies. """ return True
[docs] @classmethod def supports_multiple_field_topologies(cls): """ Should return True if an input field that is also an output field can have an input topology different from its output topology. This is usefull in Redistribute like operators. If this returns True this implies supports_multiple_topologies(). It also implies that self.variables[field] may return a set of topologies. In this case one can recover input and output topologies by using self.input_fields[field] and self.output_fields[field]. In addition one can find such fields by using the list self.multi_topo_fields which is set after ComputationalGraphNode.initialize() has been called. """ return False
[docs] @classmethod def supports_mpi(cls): """ Return True if this operator was implemented to support multiple mpi processes. """ return False
[docs] @debug def pre_initialize(self, **kwds): """ Function called before initialization, can be used to alter variables set in __init__ like input_fields or output_fields. By default this does nothing. """ pass
[docs] @debug def post_initialize(self, **kwds): """ Function called after initialization, can be used to execute routines after handle_method has been called. By default this does nothing. """ pass
[docs] @debug def initialize(self, topgraph_method=None, **kwds): """ Initialize this node. Initialization step sets the following variables: *self.method, *self.input_field_requirements *self.output_field_requirements *self.initialized It returns self.method. Order of execution is: self.pre_initialize() self._setup_method() self.handle_method() self.get_field_requirements() self._initialized = True self.post_initialize() See ComputationalGraphNode.handle_method() to see how user method is handled. See ComputationalGraphNode.get_field_requirements() to see how topology requirements are handled. After this method has been handled by all operators, initialization collects min and max ghosts required by each operators which will be usefull in the discretiezation step to automatically build topologies or check against user supplied topologies. This function also sets the self.initialized flag to True (just before post initialization). Once this flag is set one may call ComputationalGraphNode.discretize(). """ if self.initialized: return method = self._setup_method(topgraph_method) self.handle_method(method) self.initialized = True return method
[docs] @debug @topology_handled def discretize(self): """ Discretize this operator. By default this just sets the self.discretized flag to True. Once this flag is set one may call ComputationalGraphNode.get_work_properties() and ComputationalGraphNode.setup(). """ self.discretized = True
[docs] @discretized def get_input_discrete_field(self, field): """ Helper function to get a discretized input field: *Get a DiscreteScalarField from an input ScalarField. *Get a DiscreteTensorField from an input TensorField. """ check_instance(field, Field) if self.input_discrete_fields is None: msg = "{}(name={}) \n => Discretization did not set self.input_discrete_fields." msg = msg.format(self.full_tag, self.name) raise RuntimeError(msg) if self.input_discrete_tensor_fields is None: msg = "{}(name={}) \n => Discretization did not set self.input_discrete_tensor_fields." msg = msg.format(self.full_tag, self.name) raise RuntimeError(msg) if field.is_tensor: if field not in self.input_tensor_fields: msg = "{} is not a registered input TensorField for graph node:\n{}" msg = msg.format(field.short_description(), self.long_description()) raise RuntimeError(msg) return self.input_discrete_tensor_fields[field] else: if field not in self.input_fields: msg = "{} is not a registered input ScalarField for graph node:\n{}" msg = msg.format(field.short_description(), self.long_description()) raise RuntimeError(msg) return self.input_discrete_fields[field]
[docs] @discretized def get_output_discrete_field(self, field): """ Helper function to get a discretized output field: *Get a DiscreteScalarField from an output ScalarField. *Get a DiscreteTensorField from an output TensorField. """ check_instance(field, Field) if self.output_discrete_fields is None: msg = "{}(name={}) \n => Discretization did not set self.output_discrete_fields." msg = msg.format(self.full_tag, self.name) raise RuntimeError(msg) if self.output_discrete_tensor_fields is None: msg = "{}(name={}) \n => Discretization did not set self.output_discrete_tensor_fields." msg = msg.format(self.full_tag, self.name) raise RuntimeError(msg) if field.is_tensor: if field not in self.output_tensor_fields: msg = "{} is not a registered output TensorField for graph node:\n{}" msg = msg.format(field.short_description(), self.long_description()) raise RuntimeError(msg) return self.output_discrete_tensor_fields[field] else: if field not in self.output_fields: msg = "{} is not a registered output ScalarField for graph node:\n{}" msg = msg.format(field.short_description(), self.long_description()) raise RuntimeError(msg) return self.output_discrete_fields[field]
[docs] @base_initialized def iter_input_fields(self, with_scalars=True, with_tensors=True, as_scalars=False): """ Iterate over all input fields. By default iterate over all tensors and scalars unless only_scalars or only_tensors is specified. as_scalars will ravel all tensors to scalars. """ assert with_scalars or with_tensors, "iterating over nothing" input_scalar_fields_from_tensors = { field for tfield in self.input_tensor_fields for field in tfield.fields } if with_tensors and (not as_scalars): yield from self.input_tensor_fields for field in self.input_fields: if field in input_scalar_fields_from_tensors: # field is contained in a tensor field if with_tensors and as_scalars: yield field else: # field is not contained in any tensor field if with_scalars: yield field
[docs] @base_initialized def iter_output_fields( self, with_scalars=True, with_tensors=True, as_scalars=False ): """ Iterate over all output fields. By default iterate over all tensors and scalars unless only_scalars or only_tensors is specified. as_scalars will ravel all tensors to scalars. """ assert with_scalars or with_tensors, "iterating over nothing" output_scalar_fields_from_tensors = { field for tfield in self.output_tensor_fields for field in tfield.fields } if with_tensors and (not as_scalars): yield from self.output_tensor_fields for field in self.output_fields: if field in output_scalar_fields_from_tensors: # field is contained in a tensor field if with_tensors and as_scalars: yield field else: # field is not contained in any tensor field if with_scalars: yield field
[docs] @discretized def iter_input_discrete_fields( self, with_scalars=True, with_tensors=True, as_scalars=False ): """ Iterate over all input (field, discrete_field) pairs. By default iterate over all tensors and scalars unless only_scalars or only_tensors is specified. as_scalars will ravel all tensors to scalars. """ assert with_scalars or with_tensors, "iterating over nothing" input_scalar_fields_from_tensors = { field for tfield in self.input_tensor_fields for field in tfield.fields } if with_tensors and (not as_scalars): yield from self.input_discrete_tensor_fields.items() for field, dfield in self.input_discrete_fields.items(): if field in input_scalar_fields_from_tensors: # field is contained in a tensor field if with_tensors and as_scalars: yield (field, dfield) else: # field is not contained in any tensor field if with_scalars: yield (field, dfield)
[docs] @discretized def iter_output_discrete_fields( self, with_scalars=True, with_tensors=True, as_scalars=False ): """ Iterate over all output (field, discrete_field) pairs. By default iterate over all tensors and scalars unless only_scalars or only_tensors is specified. as_scalars will ravel all tensors to scalars. """ assert with_scalars or with_tensors, "iterating over nothing" output_scalar_fields_from_tensors = { field for tfield in self.output_tensor_fields for field in tfield.fields } if with_tensors and (not as_scalars): yield from self.output_discrete_tensor_fields.items() for field, dfield in self.output_discrete_fields.items(): if field in output_scalar_fields_from_tensors: # field is contained in a tensor field if with_tensors and as_scalars: yield (field, dfield) else: # field is not contained in any tensor field if with_scalars: yield (field, dfield)
[docs] @debug @discretized def get_work_properties(self): """ Returns extra memory requirements of this node. This allows operators to request for temporary buffers that will be shared between operators in a graph to reduce the memory footprint and the number of allocations. By default this returns None, meanning that this node requires no extra buffers. """ return None
[docs] @debug @discretized def setup(self, work): """ Setup temporary buffer that have been requested in get_work_properties(). This function may be used to execute post allocation routines. This sets self.ready flag to True. Once this flag is set one may call ComputationalGraphNode.apply() and ComputationalGraphNode.finalize(). """ self.ready = True
[docs] @abstractmethod @ready def apply(self, simulation=None, **kwds): """ Abstract method that should be implemented. Applies this node (operator, computational graph operator...). """ pass
[docs] @debug @ready def finalize(self, **kwds): """ Cleanup this node (free memory from external solvers, ...) By default, this does nothing """ self.ready = False
[docs] def dump_inputs( self, fields=None, io_params=None, filename=None, frequency=None, fileformat=None, io_leader=None, **op_kwds, ): """ Tell this operator to dump some of its inputs before apply is called. Target folder, file, dump frequency and other io pameters are passed trough io_params or as keywords. """ from hysop.core.graph.computational_graph import ComputationalGraph if fields is not None: if isinstance(fields, Field): fields = (fields,) check_instance(fields, (set, list, tuple), values=Field) if self._base_initialized: for field in fields: if (field not in self.input_fields) and ( field not in self.input_tensor_fields ): msg = "Field {} is not an input field of operator {}." msg = msg.format(field.name, self.name) raise RuntimeError(msg) else: assert self._base_initialized, self.name fields = self.all_input_fields fields = list(sorted(fields, key=lambda f: f.name)) if io_params is None: io_params = self.io_params if io_params is None: msg = f"io_params was never set for operator {self.name}, please pass io_params to dump_inputs()." raise RuntimeError(msg) frequency = first_not_None(frequency, io_params.frequency) fileformat = first_not_None(fileformat, io_params.fileformat) io_leader = first_not_None(io_leader, io_params.io_leader) if filename is not None: pass elif fields is None: filename = f"{io_params.filename}_in" else: filename = "{}_{}".format( io_params.filename, "_".join(f"{f.name}in" for f in fields) ) io_params = IOParams( filename=filename, frequency=frequency, fileformat=fileformat, io_leader=io_leader, ) self._input_fields_to_dump.append((fields, io_params, op_kwds))
[docs] def dump_outputs( self, fields=None, io_params=None, filename=None, frequency=None, fileformat=None, io_leader=None, **op_kwds, ): """ Tell this operator to dump some of its outputs after apply is called. Target folder, file, dump frequency and other io pameters are passed trough instance io_params of this parameter or as keywords. """ from hysop.core.graph.computational_graph import ComputationalGraph if fields is not None: if isinstance(fields, Field): fields = (fields,) check_instance(fields, (set, list, tuple), values=Field) if self._base_initialized: for field in fields: if (field not in self.output_fields) and ( field not in self.output_tensor_fields ): msg = "Field {} is not an output field of operator {}." msg = msg.format(field.name, self.name) raise RuntimeError(msg) else: fields = self.all_output_fields fields = list(sorted(fields, key=lambda f: f.name)) if io_params is None: io_params = self.io_params if io_params is None: msg = f"io_params was never set for operator {self.name}." raise RuntimeError(msg) frequency = first_not_None(frequency, io_params.frequency) fileformat = first_not_None(fileformat, io_params.fileformat) io_leader = first_not_None(io_leader, io_params.io_leader) if filename is not None: pass elif fields is None: filename = f"{io_params.filename}_out" else: filename = "{}_{}".format( io_params.filename, "_".join(f"{f.name}out" for f in fields) ) io_params = IOParams( filename=filename, frequency=frequency, fileformat=fileformat, io_leader=io_leader, ) self._output_fields_to_dump.append((fields, io_params, op_kwds))
@property def node_tag(self): """ Return a tag of this node as a string. Cannot be used to differenciate nodes as some operators may have the same name. """ return f"{self.__class__.__name__}::{self.name}"
[docs] def long_description(self): sep = "\n *" ss = "{}[name={}, pname:{}]" ss = ss.format(self.full_tag, self.name, self.pretty_name) ss += "\n INPUT FIELDS:{}" if self.input_fields: ss = ss.format( sep + sep.join(f.short_description() for f in self.input_fields.keys()) ) else: ss = ss.format(" None") ss += "\n OUTPUT FIELDS:{}" if self.output_fields: ss = ss.format( sep + sep.join(f.short_description() for f in self.output_fields.keys()) ) else: ss = ss.format(" None") ss += "\n INPUT TENSOR FIELDS:{}" if self.input_tensor_fields: ss = ss.format( sep + sep.join(f.short_description() for f in self.input_tensor_fields) ) else: ss = ss.format(" None") ss += "\n OUTPUT TENSOR FIELDS:{}" if self.output_tensor_fields: ss = ss.format( sep + sep.join(f.short_description() for f in self.output_tensor_fields) ) else: ss = ss.format(" None") ss += "\n INPUT PARAMS:{}" if self.input_params: ss = ss.format( sep + sep.join(f.short_description() for f in self.input_params.keys()) ) else: ss = ss.format(" None") ss += "\n OUTPUT PARAMS:{}" if self.output_params: ss = ss.format( sep + sep.join(f.short_description() for f in self.output_params.keys()) ) else: ss = ss.format(" None") return ss